查看原文
其他

C# 之 RPC 很火么?

DotNet 2021-09-23

(给DotNet加星标,提升.Net技能

转自:Supper_litt
cnblogs.com/Supperlitt/p/14370804.html

前言


RPC,听过很有段时间了,但是一直都不太清楚是干嘛的,今天我们来捋一捋。


解释:


【Remote Procedure Call Protocol】远程过程调用(就是说,A程序要调用一个b方法,然而这个b方法的实现在B程序内部,B程序还可能和A不在一个电脑上面,怎么调用?http可以调用/rpc也可以,让他像调用本地方法一样调用)


使用初探


用了一下市面上的,rpc框架,步骤如下:


1、写一个基本的代码,告诉有哪些方法


2、然后服务端集成


3、客户端集成


4、OK调用生效了


感觉有点像TCP在传输数据,从A服务器传递,传递类名,方法名,参数,值,然后B服务器拿到数据,计算结果,然后把数据在回传给A。。。这样理解一下的话,就很简单了。


下面动手写一个吧。


自己动手


服务端


既然服务端是实现的地方,我们写一个算是实现类的方法试试:写了一个接口和一个实现,为了演示效果,写了两个方法。


public interface IMyTestService
{
int calc(int x, int y);
bool login(string name, string pwd);
}
public class MyTestServiceImpl : IMyTestService
{
public int calc(int x, int y)
{
return x + y;
}
public bool login(string name, string pwd)
{
if (name == "test" && pwd == "123456")
{
return true;
}
return false;
}
}


OK,服务端的大部分完成了。


然后就是TCP服务器,TCP服务器对大家来说,就太简单不过了,不就是创建一个Socket对象,绑定一个端口,获取客户端请求的Socket对象,然后和他交互么。没啥多说的。


class Program
{
static void Main(string[] args)
{
Socket server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
server.Bind(new IPEndPoint(IPAddress.Any, 10000));
server.Listen(1000);
Thread t = new Thread(Execute);
t.IsBackground = true;
t.Start(server);
Console.WriteLine("rpc服务器已启动");
Console.ReadLine();
}
private static void Execute(Object obj)
{
Socket server = obj as Socket;
while (true)
{
Socket client = server.Accept();
Thread t = new Thread(SingleExecute);
t.IsBackground = true;
t.Start(client);
}
}
private static void SingleExecute(object obj)
{
// 读取
Socket client = obj as Socket;
byte[] buffer = new byte[8192];
int count = client.Receive(buffer);
if (count > 0)
{
var data = ServiceHelpercs.Handle(buffer);
client.Send(data);
}
client.Shutdown(SocketShutdown.Both);
}
}


我们假定,所有的客户端数据,可以在一个请求包里面解析掉,因为如果一次的数据接收不能解析,那就还要添加一个大小了,客户端要告诉我你给我了多少消息,然后我再读取指定数据,拿到所有的内容


这里创建,一个ServiceHelpers来帮助对,真实算法的调用。如下:


public class ServiceHelpercs
{
public static byte[] Handle(byte[] buffer)
{
MemoryStream ms = new MemoryStream(buffer);
BinaryReader br = new BinaryReader(ms);
int inter_len = br.ReadByte();
string inter_name = Encoding.UTF8.GetString(br.ReadBytes(inter_len));
int method_len = br.ReadByte();
string method_name = Encoding.UTF8.GetString(br.ReadBytes(method_len));
int args_length = br.ReadByte();
int return_type = br.ReadByte();
List<object> list = new List<object>();
for (int i = 0; i < args_length; i++)
{
       // 0:void 忽略 1:int 2:bool 3:string
int arg_type = br.ReadByte();
if (arg_type == 1)
{
byte[] values = br.ReadBytes(4);
list.Add(bytes2int(values));
}
else if (arg_type == 2)
{
bool value = br.ReadByte() == 1;
list.Add(value);
}
else if (arg_type == 3)
{
int str_len = bytes2int(br.ReadBytes(4));
string str = Encoding.UTF8.GetString(br.ReadBytes(str_len));
list.Add(str);
}
}
Type inter_type = null;
var types = Assembly.GetExecutingAssembly().GetTypes();
foreach (var type in types)
{
var ts = type.GetInterfaces();
foreach (var t in ts)
{
if (t.Name == inter_name)
{
inter_type = type;
break;
}
}
}
MethodInfo invokeMethod = null;
if (inter_type != null)
{
var methods = inter_type.GetMethods();
foreach (var method in methods)
{
if (method.Name == method_name)
{
invokeMethod = method;
break;
}
}
}
if (invokeMethod != null)
{
Object thisObj = Activator.CreateInstance(inter_type);
object result = invokeMethod.Invoke(thisObj, list.ToArray());
if (return_type == 1)
{
int value = Convert.ToInt32(result);
reurn int2bytes(value);
}
else if (return_type == 2)
{
return new byte[1] { Convert.ToBoolean(result) ? (byte)1 : (byte)0 };
}
else if (return_type == 2)
{
List<byte> result_data = new List<byte>();
var str = (result == null ? "" : result.ToString());
var data = Encoding.UTF8.GetBytes(str);
result_data.AddRange(int2bytes(data.Length));
result_data.AddRange(data);
return result_data.ToArray();
}
}
return new byte[1] { 0xFF };
}
public static byte[] int2bytes(int len)
{
byte[] data_len = new byte[4];
data_len[0] = (byte)((len >> 8 * 3) & 0xFF);
data_len[1] = (byte)((len >> 8 * 2) & 0xFF);
data_len[2] = (byte)((len >> 8 * 1) & 0xFF);
data_len[3] = (byte)(len & 0xFF);
return data_len;
}
public static int bytes2int(byte[] buffer)
{
int value = 0;
value += (int)(buffer[0] << (8 * 3));
value += (int)(buffer[1] << (8 * 2));
value += (int)(buffer[2] << (8 * 1));
value += (int)(buffer[3]);
return value;
}
}


解析的类很简单,因为这里创建的数据结构很简单。



按照我们的约定,这里,对数据按照我定义的方式来进行解包即可。


服务器就完成了,是不是很简单。当然客户端也需要按照一样的方式处理打包即可


客户端


客户端就很简单了,只需要连接到服务器,通过我们自动生成的代码(这里没有写自动生成,就手动了),然后就直接可以返回结果了

class Program
{
static void Main(string[] args)
{
IMyService service = new MyServiceProxy();
DateTime startTime = DateTime.Now;
int result = service.add(123, 321);
int min_seconds = (int)(DateTime.Now - startTime).TotalMilliseconds;
Console.WriteLine(result + " 耗时 " + min_seconds);
Console.ReadLine();
}
}


上面直接调用了,接口,至于接口的实现,这里的步骤就三个:1、构造需要请求的数据,2、连接服务器并发送数据,3、接收返回内容,并解析结果。


public class MyServiceProxy : IMyService
{
public int add(int x, int y)
{
List<ArgInfo> argList = new List<ArgInfo>();
argList.Add(new ArgInfo(TypeEnu.Int, x));
argList.Add(new ArgInfo(TypeEnu.Int, y));
byte[] send_data = create_send_package("IMyService", "add", 2, TypeEnu.Int, argList);
Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
client.Connect(new IPEndPoint(IPAddress.Parse("192.168.0.105"), 10000));
client.Send(send_data);
byte[] buffer = new byte[4];
int count = client.Receive(buffer);
if (count > 0)
{
return bytes2int(buffer);
}
throw new Exception("系统异常");
}
public bool login(string name, string pwd)
{
List<ArgInfo> argList = new List<ArgInfo>();
argList.Add(new ArgInfo(TypeEnu.String, name));
argList.Add(new ArgInfo(TypeEnu.String, pwd));
byte[] send_data = create_send_package("IMyService", "login", 2, TypeEnu.Bool, argList);
Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
client.Connect(new IPEndPoint(IPAddress.Parse("192.168.0.105"), 10000));
client.Send(send_data);
byte[] buffer = new byte[1];
int count = client.Receive(buffer);
if (count > 0)
{
return buffer[0] == 1;
}
throw new Exception("系统异常");
}
private byte[] create_send_package(string inter_name, string method_name, int arg_length, TypeEnu return_type, List<ArgInfo> argList)
{
List<byte> list = new List<byte>();
list.Add((byte)inter_name.Length);
list.AddRange(Encoding.UTF8.GetBytes(inter_name));
list.Add((byte)method_name.Length);
list.AddRange(Encoding.UTF8.GetBytes(method_name));
list.Add((byte)arg_length);
list.Add((byte)return_type);
foreach (var arg in argList)
{
list.Add((byte)arg.type);
if (arg.type == TypeEnu.Int)
{ list.AddRange(int2bytes(Convert.ToInt32(arg.value)));
}
else if (arg.type == TypeEnu.Bool)
{
bool value = Convert.ToBoolean(arg.value);
list.Add(value ? (byte)1 : (byte)0);
}
else if (arg.type == TypeEnu.String)
{
string value = arg.value.ToString();
list.AddRange(int2bytes(value.Length));
list.AddRange(Encoding.UTF8.GetBytes(value));
}
}
return list.ToArray();
}
public byte[] int2bytes(int len)
{
byte[] data_len = new byte[4];
data_len[0] = (byte)((len >> 8 * 3) & 0xFF);
data_len[1] = (byte)((len >> 8 * 2) & 0xFF);
data_len[2] = (byte)((len >> 8 * 1) & 0xFF);
data_len[3] = (byte)(len & 0xFF);
return data_len;
}
public int bytes2int(byte[] buffer)
{
int value = 0;
value += (int)(buffer[0] << (8 * 3));
value += (int)(buffer[1] << (8 * 2));
value += (int)(buffer[2] << (8 * 1));
value += (int)(buffer[3]);
return value;
}
}
public class ArgInfo
{
public TypeEnu type { get; set; }
public object value { get; set; }
public ArgInfo(TypeEnu type, object value)
{
this.type = type;
this.value = value;
}
}
public enum TypeEnu
{
Void = 0,
Int = 1,
Bool = 2,
String = 3
}


接口的定义沿用服务端的即可。说明一点:MyServiceProxy这个类,这里我是手写的,真实的环境,这个类,应该是由我们定义的某种格式,然后写一个代码生成器,让他自动生成,然后就可以不用费力,兼容所有的调用了,


当然这里只支持了四种类型,我们还可以扩充更多类型,只需要找到传递数据的方式即可。譬如一种对象,我们不知道如何传递,可以直接把对象定义成一个json字符串,或者序列化成二进制,只要两端,都知道了这个类型,就可以了。


相当于设计模式里面的(约定大于配置了)


知识点梳理


这里有一些知识点,是不常用的,这里梳理出来了。


1、MemoryStream ms = new MemoryStream(buffer); BinaryReader br = new BinaryReader(ms); 通过binaryReader的方式,可以像C/C++指针一样取数据


2、var types = Assembly.GetExecutingAssembly().GetTypes(); 通过Assembly可以得到当前exe或者dll的所有类型(类接口都是一种类型)


3、Object thisObj = Activator.CreateInstance(inter_type); 通过Activator调用默认构造,实现对象的初始化


总结


这样一个rpc框架,本身并没有优化,还有很多地方是可以优化的,比如:缓存(不用每次遍历查询类型等),udp支持(这里仅仅只是对tcp进行了支持),


自动代码生成(定义一种规范和支持程序,进行支持),错误重试,数据唯一性,数据包的大小处理,等等,所以想要开发一个易用的框架,还需要不断演进,这里只是对他的原理进行了简单剖析。

代码git:https://github.com/supperlitt/tcp_all


- EOF -


推荐阅读  点击标题可跳转
动态方法拦截(AOP)的N种解决方案C# 规则引擎 RulesEngine当.NET 5遇上OpenTelemetry,会碰撞出怎样的火花?


看完本文有收获?请转发分享给更多人

推荐关注「DotNet」,提升.Net技能 

点赞和在看就是最大的支持❤️

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存